package com.my.service.task.logistic;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;

import java.util.*;

public class LogisticTask {
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        DataSet<String> text = env.readTextFile(params.get("input"));
        DataSet<LogisticInfo> mapResult = text.map(new LogisticMap());
        DataSet<ArrayList<Double>> weightsRes = mapResult.groupBy("groupfield").reduceGroup(new LogisticReducer());
        List<ArrayList<Double>> weights = weightsRes.collect();
        int groupsize = weights.size();
        // sumMap记录每个位置的weight之和
        Map<Integer, Double> sumMap = new TreeMap<>(Integer::compareTo);
        for (ArrayList<Double> weight : weights) {
            for (int i = 0; i < weight.size(); i++) {
                double pre = sumMap.get(i) == null ? 0d : sumMap.get(i);
                sumMap.put(i, pre + weight.get(i));
            }
        }
        //sumMap sumMap统计完成
        ArrayList<Double> finalweight = new ArrayList<Double>();
        Set<Map.Entry<Integer, Double>> set = sumMap.entrySet();
        //由于sumMap是按照key已经从小到大排列了 所以直接遍历就可以获取到 每个index对应的weight均值
        for (Map.Entry<Integer, Double> mapEntry : set) {
            Double sumvalue = mapEntry.getValue();
            double finalvalue = sumvalue / groupsize;
            finalweight.add(finalvalue);
        }
        //finalweight即为最终各个feature的权重
        env.execute("logistic analysis");
    }
}
